// -------------------------------------------------------------------------------------------------
//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
//  https://nautechsystems.io
//
//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
//  You may not use this file except in compliance with the License.
//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
//
//  Unless required by applicable law or agreed to in writing, software
//  distributed under the License is distributed on an "AS IS" BASIS,
//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
//  See the License for the specific language governing permissions and
//  limitations under the License.
// -------------------------------------------------------------------------------------------------

//! Python bindings for the Kraken WebSocket client.
//!
//! # Design Pattern: Clone and Share State
//!
//! The WebSocket client must be cloned for async operations because PyO3's `future_into_py`
//! requires `'static` futures (cannot borrow from `self`). To ensure clones share the same
//! connection state, key fields use `Arc`:
//!
//! - `ws_client: Option<Arc<WebSocketClient>>` - The WebSocket connection.
//! - `subscriptions: Arc<DashMap<String, KrakenWsChannel>>` - Subscription tracking.
//!
//! Without shared state, clones would be independent, causing:
//! - Lost WebSocket messages.
//! - Missing subscription data.
//! - Connection state desynchronization.
//!
//! ## Connection Flow
//!
//! 1. Clone the client for async operation.
//! 2. Connect and populate shared state on the clone.
//! 3. Spawn stream handler as background task.
//! 4. Return immediately (non-blocking).
//!
//! ## Important Notes
//!
//! - Never use `block_on()` - it blocks the runtime.
//! - Always clone before async blocks for lifetime requirements.

use futures_util::StreamExt;
use nautilus_core::python::to_pyruntime_err;
use nautilus_model::{
    data::{BarType, Data, OrderBookDeltas_API},
    identifiers::{AccountId, ClientOrderId, InstrumentId, StrategyId, TraderId, VenueOrderId},
    python::{data::data_to_pycapsule, instruments::pyobject_to_instrument_any},
};
use pyo3::{IntoPyObjectExt, prelude::*};
use tokio_util::sync::CancellationToken;

use crate::{
    common::{
        enums::{KrakenEnvironment, KrakenProductType},
        urls::get_kraken_ws_private_url,
    },
    config::KrakenDataClientConfig,
    websocket::spot_v2::{client::KrakenSpotWebSocketClient, messages::NautilusWsMessage},
};

#[pymethods]
impl KrakenSpotWebSocketClient {
    #[new]
    #[pyo3(signature = (environment=None, private=false, base_url=None, heartbeat_secs=None, api_key=None, api_secret=None))]
    fn py_new(
        environment: Option<KrakenEnvironment>,
        private: bool,
        base_url: Option<String>,
        heartbeat_secs: Option<u64>,
        api_key: Option<String>,
        api_secret: Option<String>,
    ) -> PyResult<Self> {
        let env = environment.unwrap_or(KrakenEnvironment::Mainnet);

        let (resolved_api_key, resolved_api_secret) =
            crate::common::credential::KrakenCredential::resolve_spot(api_key, api_secret)
                .map(|c| c.into_parts())
                .map(|(k, s)| (Some(k), Some(s)))
                .unwrap_or((None, None));

        let (ws_public_url, ws_private_url) = if private {
            // Use provided URL or default to the private endpoint
            let private_url = base_url.unwrap_or_else(|| {
                get_kraken_ws_private_url(KrakenProductType::Spot, env).to_string()
            });
            (None, Some(private_url))
        } else {
            (base_url, None)
        };

        let config = KrakenDataClientConfig {
            environment: env,
            ws_public_url,
            ws_private_url,
            heartbeat_interval_secs: heartbeat_secs,
            api_key: resolved_api_key,
            api_secret: resolved_api_secret,
            ..Default::default()
        };

        let token = CancellationToken::new();

        Ok(KrakenSpotWebSocketClient::new(config, token))
    }

    #[getter]
    #[pyo3(name = "url")]
    #[must_use]
    pub fn py_url(&self) -> &str {
        self.url()
    }

    #[pyo3(name = "is_connected")]
    fn py_is_connected(&self) -> bool {
        self.is_connected()
    }

    #[pyo3(name = "is_active")]
    fn py_is_active(&self) -> bool {
        self.is_active()
    }

    #[pyo3(name = "is_closed")]
    fn py_is_closed(&self) -> bool {
        self.is_closed()
    }

    #[pyo3(name = "get_subscriptions")]
    fn py_get_subscriptions(&self) -> Vec<String> {
        self.get_subscriptions()
    }

    #[pyo3(name = "cache_instrument")]
    fn py_cache_instrument(&self, py: Python<'_>, instrument: Py<PyAny>) -> PyResult<()> {
        self.cache_instrument(pyobject_to_instrument_any(py, instrument)?);
        Ok(())
    }

    #[pyo3(name = "set_account_id")]
    fn py_set_account_id(&self, account_id: AccountId) {
        self.set_account_id(account_id);
    }

    #[pyo3(name = "cache_client_order")]
    fn py_cache_client_order(
        &self,
        client_order_id: ClientOrderId,
        _venue_order_id: Option<VenueOrderId>,
        instrument_id: InstrumentId,
        trader_id: TraderId,
        strategy_id: StrategyId,
    ) {
        // Note: venue_order_id not used for spot yet, but kept for API consistency
        self.cache_client_order(client_order_id, instrument_id, trader_id, strategy_id);
    }

    #[pyo3(name = "cancel_all_requests")]
    fn py_cancel_all_requests(&self) {
        self.cancel_all_requests();
    }

    #[pyo3(name = "connect")]
    fn py_connect<'py>(
        &mut self,
        py: Python<'py>,
        instruments: Vec<Py<PyAny>>,
        callback: Py<PyAny>,
    ) -> PyResult<Bound<'py, PyAny>> {
        let mut instruments_any = Vec::new();
        for inst in instruments {
            let inst_any = pyobject_to_instrument_any(py, inst)?;
            instruments_any.push(inst_any);
        }

        let mut client = self.clone();

        pyo3_async_runtimes::tokio::future_into_py(py, async move {
            client.connect().await.map_err(to_pyruntime_err)?;

            // Cache instruments after connection is established
            client.cache_instruments(instruments_any);

            let stream = client.stream();

            tokio::spawn(async move {
                tokio::pin!(stream);

                while let Some(msg) = stream.next().await {
                    match msg {
                        NautilusWsMessage::Data(data_vec) => {
                            Python::attach(|py| {
                                for data in data_vec {
                                    let py_obj = data_to_pycapsule(py, data);
                                    call_python(py, &callback, py_obj);
                                }
                            });
                        }
                        NautilusWsMessage::Deltas(deltas) => {
                            Python::attach(|py| {
                                let py_obj = data_to_pycapsule(
                                    py,
                                    Data::Deltas(OrderBookDeltas_API::new(deltas)),
                                );
                                call_python(py, &callback, py_obj);
                            });
                        }
                        NautilusWsMessage::OrderAccepted(event) => {
                            Python::attach(|py| match event.into_py_any(py) {
                                Ok(py_obj) => call_python(py, &callback, py_obj),
                                Err(e) => {
                                    tracing::error!(
                                        "Failed to convert OrderAccepted to Python: {e}"
                                    );
                                }
                            });
                        }
                        NautilusWsMessage::OrderCanceled(event) => {
                            Python::attach(|py| match event.into_py_any(py) {
                                Ok(py_obj) => call_python(py, &callback, py_obj),
                                Err(e) => {
                                    tracing::error!(
                                        "Failed to convert OrderCanceled to Python: {e}"
                                    );
                                }
                            });
                        }
                        NautilusWsMessage::OrderExpired(event) => {
                            Python::attach(|py| match event.into_py_any(py) {
                                Ok(py_obj) => call_python(py, &callback, py_obj),
                                Err(e) => {
                                    tracing::error!(
                                        "Failed to convert OrderExpired to Python: {e}"
                                    );
                                }
                            });
                        }
                        NautilusWsMessage::OrderUpdated(event) => {
                            Python::attach(|py| match event.into_py_any(py) {
                                Ok(py_obj) => call_python(py, &callback, py_obj),
                                Err(e) => {
                                    tracing::error!(
                                        "Failed to convert OrderUpdated to Python: {e}"
                                    );
                                }
                            });
                        }
                        NautilusWsMessage::OrderStatusReport(report) => {
                            Python::attach(|py| match (*report).into_py_any(py) {
                                Ok(py_obj) => call_python(py, &callback, py_obj),
                                Err(e) => {
                                    tracing::error!(
                                        "Failed to convert OrderStatusReport to Python: {e}"
                                    );
                                }
                            });
                        }
                        NautilusWsMessage::FillReport(report) => {
                            Python::attach(|py| match (*report).into_py_any(py) {
                                Ok(py_obj) => call_python(py, &callback, py_obj),
                                Err(e) => {
                                    tracing::error!("Failed to convert FillReport to Python: {e}");
                                }
                            });
                        }
                        NautilusWsMessage::Reconnected => {
                            tracing::info!("WebSocket reconnected");
                        }
                    }
                }
            });

            Ok(())
        })
    }

    #[pyo3(name = "wait_until_active")]
    fn py_wait_until_active<'py>(
        &self,
        py: Python<'py>,
        timeout_secs: f64,
    ) -> PyResult<Bound<'py, PyAny>> {
        let client = self.clone();

        pyo3_async_runtimes::tokio::future_into_py(py, async move {
            client
                .wait_until_active(timeout_secs)
                .await
                .map_err(to_pyruntime_err)?;
            Ok(())
        })
    }

    #[pyo3(name = "authenticate")]
    fn py_authenticate<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
        let client = self.clone();

        pyo3_async_runtimes::tokio::future_into_py(py, async move {
            client.authenticate().await.map_err(to_pyruntime_err)?;
            Ok(())
        })
    }

    #[pyo3(name = "disconnect")]
    fn py_disconnect<'py>(&mut self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
        let mut client = self.clone();

        pyo3_async_runtimes::tokio::future_into_py(py, async move {
            client.disconnect().await.map_err(to_pyruntime_err)?;
            Ok(())
        })
    }

    #[pyo3(name = "send_ping")]
    fn py_send_ping<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
        let client = self.clone();

        pyo3_async_runtimes::tokio::future_into_py(py, async move {
            client.send_ping().await.map_err(to_pyruntime_err)?;
            Ok(())
        })
    }

    #[pyo3(name = "close")]
    fn py_close<'py>(&mut self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
        let mut client = self.clone();

        pyo3_async_runtimes::tokio::future_into_py(py, async move {
            client.close().await.map_err(to_pyruntime_err)?;
            Ok(())
        })
    }

    #[pyo3(name = "subscribe_book")]
    fn py_subscribe_book<'py>(
        &self,
        py: Python<'py>,
        instrument_id: InstrumentId,
        depth: Option<u32>,
    ) -> PyResult<Bound<'py, PyAny>> {
        let client = self.clone();

        pyo3_async_runtimes::tokio::future_into_py(py, async move {
            client
                .subscribe_book(instrument_id, depth)
                .await
                .map_err(to_pyruntime_err)?;
            Ok(())
        })
    }

    #[pyo3(name = "subscribe_quotes")]
    fn py_subscribe_quotes<'py>(
        &self,
        py: Python<'py>,
        instrument_id: InstrumentId,
    ) -> PyResult<Bound<'py, PyAny>> {
        let client = self.clone();

        pyo3_async_runtimes::tokio::future_into_py(py, async move {
            client
                .subscribe_quotes(instrument_id)
                .await
                .map_err(to_pyruntime_err)?;
            Ok(())
        })
    }

    #[pyo3(name = "subscribe_trades")]
    fn py_subscribe_trades<'py>(
        &self,
        py: Python<'py>,
        instrument_id: InstrumentId,
    ) -> PyResult<Bound<'py, PyAny>> {
        let client = self.clone();

        pyo3_async_runtimes::tokio::future_into_py(py, async move {
            client
                .subscribe_trades(instrument_id)
                .await
                .map_err(to_pyruntime_err)?;
            Ok(())
        })
    }

    #[pyo3(name = "subscribe_bars")]
    fn py_subscribe_bars<'py>(
        &self,
        py: Python<'py>,
        bar_type: BarType,
    ) -> PyResult<Bound<'py, PyAny>> {
        let client = self.clone();

        pyo3_async_runtimes::tokio::future_into_py(py, async move {
            client
                .subscribe_bars(bar_type)
                .await
                .map_err(to_pyruntime_err)?;
            Ok(())
        })
    }

    #[pyo3(name = "subscribe_executions")]
    #[pyo3(signature = (snap_orders=true, snap_trades=true))]
    fn py_subscribe_executions<'py>(
        &self,
        py: Python<'py>,
        snap_orders: bool,
        snap_trades: bool,
    ) -> PyResult<Bound<'py, PyAny>> {
        let client = self.clone();

        pyo3_async_runtimes::tokio::future_into_py(py, async move {
            client
                .subscribe_executions(snap_orders, snap_trades)
                .await
                .map_err(to_pyruntime_err)?;
            Ok(())
        })
    }

    #[pyo3(name = "unsubscribe_book")]
    fn py_unsubscribe_book<'py>(
        &self,
        py: Python<'py>,
        instrument_id: InstrumentId,
    ) -> PyResult<Bound<'py, PyAny>> {
        let client = self.clone();

        pyo3_async_runtimes::tokio::future_into_py(py, async move {
            client
                .unsubscribe_book(instrument_id)
                .await
                .map_err(to_pyruntime_err)?;
            Ok(())
        })
    }

    #[pyo3(name = "unsubscribe_quotes")]
    fn py_unsubscribe_quotes<'py>(
        &self,
        py: Python<'py>,
        instrument_id: InstrumentId,
    ) -> PyResult<Bound<'py, PyAny>> {
        let client = self.clone();

        pyo3_async_runtimes::tokio::future_into_py(py, async move {
            client
                .unsubscribe_quotes(instrument_id)
                .await
                .map_err(to_pyruntime_err)?;
            Ok(())
        })
    }

    #[pyo3(name = "unsubscribe_trades")]
    fn py_unsubscribe_trades<'py>(
        &self,
        py: Python<'py>,
        instrument_id: InstrumentId,
    ) -> PyResult<Bound<'py, PyAny>> {
        let client = self.clone();

        pyo3_async_runtimes::tokio::future_into_py(py, async move {
            client
                .unsubscribe_trades(instrument_id)
                .await
                .map_err(to_pyruntime_err)?;
            Ok(())
        })
    }

    #[pyo3(name = "unsubscribe_bars")]
    fn py_unsubscribe_bars<'py>(
        &self,
        py: Python<'py>,
        bar_type: BarType,
    ) -> PyResult<Bound<'py, PyAny>> {
        let client = self.clone();

        pyo3_async_runtimes::tokio::future_into_py(py, async move {
            client
                .unsubscribe_bars(bar_type)
                .await
                .map_err(to_pyruntime_err)?;
            Ok(())
        })
    }
}

pub fn call_python(py: Python, callback: &Py<PyAny>, py_obj: Py<PyAny>) {
    if let Err(e) = callback.call1(py, (py_obj,)) {
        tracing::error!("Error calling Python: {e}");
    }
}
